package com.dyrnq.seckill.mq.rabbitmq;

import com.dyrnq.seckill.mq.MessageVo;
import com.dyrnq.seckill.mq.Producer;
import com.dyrnq.seckill.service.impl.SeckillOrderService;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

import java.io.IOException;

import static com.dyrnq.seckill.mq.rabbitmq.RabbitConstants.RABBIT_LISTENER_CONTAINER_FACTORY;

@Component
@Slf4j
@ConditionalOnProperty(value = "queue.impl", havingValue = "rabbitmq")
@RequiredArgsConstructor
public class RabbitMQConsumer implements ChannelAwareMessageListener {

    private final SeckillOrderService seckillOrderService;

    private final ObjectMapper objectMapper = new ObjectMapper();

    /**
     * Consume the RabbitMQ message and process it.
     *
     * @param properties The message properties.
     * @param msg        The parsed message body as a {@link MessageVo} object.
     * @param channel    The RabbitMQ channel.
     */
    private void consume(MessageProperties properties, MessageVo msg, Channel channel) {
        log.info("Message GoodsId {} , UserId {} | Consumed {} | Channel {}", msg.getGoodsId(), msg.getUserId(), properties.getConsumerQueue(), channel.getConnection());
        long goods_id = msg.getGoodsId();
        long user_id = msg.getUserId();
        seckillOrderService.insert(goods_id, user_id);
    }

    /**
     * This consumer listens to queued events in RabbitMQ and processes them.
     */
    @Override
    public void onMessage(Message message, Channel channel) throws InterruptedException {
        if (message == null) {
            log.warn("Received an empty message.");
            return;
        }

        MessageVo messageVo;
        try {
            messageVo = objectMapper.readValue(new String(message.getBody()), MessageVo.class);
        } catch (IOException ex) {
            log.error("Failed to parse message body due to: ", ex);
            return;
        }
        MessageProperties properties = message.getMessageProperties();
        consume(properties, messageVo, channel);
    }

    @RabbitListener(containerFactory = RABBIT_LISTENER_CONTAINER_FACTORY, queues = {Producer.QUEUE_NAME})
    public void receive(Channel channel, Message message) throws InterruptedException {
        this.onMessage(message, channel);
    }
}